/* -*-c++-*- */
/* osgEarth - Geospatial SDK for OpenSceneGraph
 * Copyright 2020 Pelican Mapping
 * http://osgearth.org
 *
 * osgEarth is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>
 */
#ifndef OSGEARTH_THREADING_UTILS_H
#define OSGEARTH_THREADING_UTILS_H 1

#include <osgEarth/Common>
#include <atomic>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <unordered_map>
#include <queue>
#include <thread>
#include <future>
#include <type_traits>

// to include the file and line as the mutex name
#define OE_MUTEX_NAME __FILE__ ":" OE_STRINGIFY(__LINE__)

namespace osgEarth { namespace Threading
{
    //! C++ BasicLockable requirement
    class BasicLockable
    {
    public:
        virtual void lock() =0;
        virtual void unlock() = 0;
    };

    //! C++ Lockable requirement
    class Lockable : public BasicLockable
    {
    public:
        virtual bool try_lock() =0;
    };

    /**
     * A normal mutex
     */
    class OSGEARTH_EXPORT Mutex : public Lockable
    {
    public:
        Mutex();
        Mutex(const std::string& name, const char* file=nullptr, std::uint32_t line=0);
        //! Do not permit copy constructor on Mutex
        Mutex(const Mutex& copy) = delete;
        ~Mutex();

        //! Explicitly block the copy operator
        Mutex& operator=(const Mutex& copy) = delete;

        void lock() override;
        void unlock() override;
        bool try_lock() override;

        void setName(const std::string& name);

    private:
        std::string _name;
        void* _handle;
        void* _metricsData;
    };

    typedef std::lock_guard<BasicLockable> ScopedMutexLock;

    /**
     * A recursive mutex
     */
    class OSGEARTH_EXPORT RecursiveMutex : public Lockable
    {
    public:
        RecursiveMutex();
        RecursiveMutex(const std::string& name, const char* file=nullptr, std::uint32_t line=0);
        ~RecursiveMutex();

        //! Enable or disable this mutex. Don't call this while threads are running.
        void disable();

        void lock() override;
        void unlock() override;
        bool try_lock() override;

        void setName(const std::string& name);

    private:
        bool _enabled;
        std::string _name;
        void* _handle;
        void* _metricsData;
    };

    typedef std::lock_guard<RecursiveMutex> ScopedRecursiveMutexLock;

    /**
     * Gets the approximate number of available threading contexts.
     * Result is guaranteed to be greater than zero
     */
    extern OSGEARTH_EXPORT unsigned getConcurrency();

    /**
     * Gets the unique ID of the running thread.
     */
    extern OSGEARTH_EXPORT unsigned getCurrentThreadId();

    /**
    * Pure interface for an object that can be canceled.
    */
    class Cancelable
    {
    public:
        virtual bool isCanceled() const = 0;
    };

    /**
     * Event with a binary signaled state, for multi-threaded sychronization.
     *
     * The event has two states:
     *  "set" means that a call to wait() will not block;
     *  "unset" means that calls to wait() will block until another thread calls set().
     *
     * The event starts out unset.
     *
     * Typical usage: Thread A creates Thread B to run asynchronous code. Thread A
     * then calls wait(), which blocks Thread A. When Thread B is finished, it calls
     * set(). Thread A then wakes up and continues execution.
     *
     * NOTE: ALL waiting threads will wake up when the Event is cleared.
     */
    class OSGEARTH_EXPORT Event
    {
    public:
        //! Construct a new event
        Event();

        Event(const std::string& name);

        //! DTOR
        ~Event();

        //! Block until the event is set, then return true.
        bool wait();

        //! Block until the event is set or the timout expires.
        //! Return true if the event has set, otherwise false.
        bool wait(unsigned timeout_ms);

        //! Like wait(), but resets the state and always returns true.
        bool waitAndReset();

        //! Set the event state, causing any waiters to unblock.
        void set();

        //! Reset (unset) the event state; new waiters will block until set() is called.
        void reset();

        //! Whether the event state is set (waiters will not block).
        inline bool isSet() const { return _set; }

        void setName(const std::string& name);

    protected:
        Mutex _m;
        std::condition_variable_any _cond;
        bool _set;
    };

    /**
     * Future is the consumer-side interface to an asynchronous operation.
     *
     * Usage:
     *   Producer (usually an asynchronous function call) creates a Promise<T>
     *   and immediately returns promise.getFuture(). The Consumer then performs other
     *   work, and eventually (or immediately) checks isAvailable() for a result or
     *   isAbandoned() for cancelation.
     */
    template<typename T>
    class Future : public Cancelable
    {
    public:
        typedef std::function<void(const T&)> Callback;

    private:
        // internal structure to track referenced to the result
        struct Container {
            Container() { }
            Container(const T& obj) { set(obj); }
            void set(const T& obj) { _obj = std::move(obj); }
            T _obj;
            const T& obj() const { return _obj; }
        };

    public:
        //! Blank CTOR
        Future() {
            _ev = std::make_shared<Event>();
            _shared = std::make_shared<Container>();
        }

        //! Named CTOR
        Future(const std::string& name) {
            _ev = std::make_shared<Event>(name);
            _shared = std::make_shared<Container>();
        }

        //! Copy CTOR
        Future(const Future& rhs) :
            _ev(rhs._ev),
            _shared(rhs._shared) { }

        //! Assignment
        Future<T>& operator = (const Future<T>& rhs) {
            _ev = rhs._ev;
            _shared = rhs._shared;
            return *this;
        }

        //! True if the promise was resolved and a result if available.
        bool isAvailable() const {
            return _ev->isSet();
        }

        //! True if the Promise that generated this Future no longer exists
        //! and the Promise was never resolved.
        bool isAbandoned() const {
            return !isAvailable() && _shared.use_count() == 1;
        }

        //! Synonym for isAbandoned. (Canceleble interface)
        bool isCanceled() const override {
            return isAbandoned();
        }

        //! Deference the result object. Make sure you check isAvailable()
        //! to check that the future was actually resolved; otherwise you
        //! will just get the default object.
        const T& get() const {
            return _shared->obj();
        }

        //! Deference the result object and then reset the future to
        //! its uninitalized state.
        T release() {
            T result = get();
            reset();
            return result;
        }

        //! Blocks until the result becomes available or the future is abandoned;
        //! then returns the result object.
        const T& join() const {
            while (
                !isAbandoned() &&
                !_ev->wait(1u));
            return get();
        }

        //! Blocks until the result becomes available or the future is abandoned
        //! or a cancelation flag is set; then returns the result object.
        const T& join(const Cancelable* cancelable) const {
            while (
                !isAvailable() &&
                !isAbandoned() &&
                !(cancelable && cancelable->isCanceled()))
            {
                _ev->wait(1u);
            }
            return get();
        }

        //! Release reference to a promise, resetting this future to its default state
        void abandon() {
            _shared.reset(new Container());
            _ev->reset();
        }

        //! synonym for abandon.
        void reset() {
            abandon();
        }

    private:
        std::shared_ptr<Event> _ev;
        std::shared_ptr<Container> _shared;
        template<typename U> friend class Promise;
    };

    /**
     * Promise is the producer-side interface to an asynchronous operation.
     *
     * Usage: The code that initiates an asychronous operation creates a Promise
     *   object, dispatches the asynchronous code, and immediately returns
     *   Promise.getFuture(). The caller can then call future.get() to block until
     *   the result is available.
     */
    template<typename T>
    class Promise : public Cancelable
    {
    public:
        Promise() { }

        Promise(const std::string& name) : _future(name) { }

        //! This promise's future result.
        Future<T> getFuture() const { return _future; }

        //! Resolve (fulfill) the promise with the provided result value.
        void resolve(const T& value) {
            _future._shared->set(value);
            _future._ev->set();
        }

        //! Resolve (fulfill) the promise with a default result.
        void resolve() {
            _future._ev->set();
        }

        //! True if the promise is resolved and the Future holds a valid result.
        bool isResolved() const {
            return _future._ev->isSet();
        }

        //! True is there are no Future objects waiting on this Promise.
        bool isAbandoned() const {
            return _future._shared.use_count() == 1;
        }

        bool isCanceled() const override {
            return isAbandoned();
        }

    private:
        Future<T> _future;
    };

    /**
     * Convenience base class for representing a Result object that may be
     * synchronous or asynchronous, depending on which constructor you use.
     */
    template<typename T>
    class FutureResult
    {
    public:
        bool isReady() const { 
            return _future.isAvailable() || _future.isAbandoned();
        }

        bool isWorking() const {
            return !_future.isAvailable() && !_future.isAbandoned();
        }

    protected:
        //! Asynchronous constructor
        FutureResult(Future<T> f) : _future(f) { }

        //! Immediate synchronous resolve constructor
        FutureResult(const T& data) {
            Promise<T> p;
            _future = p.getFuture();
            p.resolve(data);
        }

        Future<T> _future;
    };


    /**
    * Mutex that locks on a per-object basis
    */
    template<typename T>
    class Gate
    {
    public:
        Gate() { }

        Gate(const std::string& name) : _m(name) { }

        inline void lock(T key) {
            std::unique_lock<Mutex> lock(_m);
            while(_users[key] > 0)
                _unlocked.wait(lock);
            ++_users[key];
        }

        inline void unlock(T key) {
            std::unique_lock<Mutex> lock(_m);
            --_users[key];
            if (_users[key] == 0) {
                _users.erase(key);
                _unlocked.notify_all();
            }
        }

        inline void setName(const std::string& name) {
            _m.setName(name);
        }

    private:
        Mutex _m;
        std::condition_variable_any _unlocked;
        std::unordered_map<T, int> _users;
    };

    template<typename T>
    struct ScopedGate {
        Gate<T>& _gate;
        T _key;
        ScopedGate(Gate<T>& gate, T key) : _gate(gate), _key(key) { _gate.lock(key); }
        ~ScopedGate() { _gate.unlock(_key); }
    };

    /**
     * Mutex that allows many simultaneous readers but only one writer
     */
    template<typename T>
    class ReadWrite
    {
    public:
        ReadWrite() :
            _readers(0u), _writers(0u) { }

        ReadWrite(const std::string& name) :
            _m(name), _readers(0u), _writers(0u) { }

        void read_lock() {
            std::unique_lock<T> lock(_m);
            _unlocked.wait(lock, [&]() { return _writers == 0; });
            ++_readers;
        }

        void read_unlock() {
            std::unique_lock<T> lock(_m);
            --_readers;
            if (_readers == 0)
                _unlocked.notify_one();
        }

        void write_lock() {
            std::unique_lock<T> lock(_m);
            _unlocked.wait(lock, [&]() { return _writers == 0 && _readers == 0; });
            ++_writers;
        }

        void write_unlock() {
            std::unique_lock<T> lock(_m);
            _writers = 0;
            _unlocked.notify_one();
        }

        void setName(const std::string& name) {
            _m.setName(name);
        }

    private:
        T _m;
        std::condition_variable_any _unlocked;
        unsigned _writers;
        unsigned _readers;
    };

    template<typename T>
    struct ScopedWrite {
        ScopedWrite( ReadWrite<T>& lock ) : _lock(lock) { _lock.write_lock(); }
        ~ScopedWrite() { _lock.write_unlock(); }
    private:
        ReadWrite<T>& _lock;
    };

    template<typename T>
    struct ScopedRead {
        ScopedRead( ReadWrite<T>& lock ) : _lock(lock) { _lock.read_lock(); }
        ~ScopedRead() { _lock.read_unlock(); }
    private:
        ReadWrite<T>& _lock;
    };

    typedef ReadWrite<Mutex> ReadWriteMutex;
    typedef ReadWrite<RecursiveMutex> ReadWriteRecursiveMutex;
    typedef ScopedRead<Mutex> ScopedReadLock;
    typedef ScopedWrite<Mutex> ScopedWriteLock;
    typedef ScopedRead<RecursiveMutex> ScopedRecursiveReadLock;
    typedef ScopedWrite<RecursiveMutex> ScopedRecursiveWriteLock;

    /**
     * Simple convenience construct to make another type "lockable"
     * as long as it has a default constructor
     */
    template<typename T>
    struct Mutexed : public T, public BasicLockable {
        Mutexed() : T() { }
        Mutexed(const std::string& name) : _lockable_mutex(name), T() { }
        void setName(const std::string& name) { _lockable_mutex.setName(name); }
        void lock() { _lockable_mutex.lock(); }
        void lock() const { _lockable_mutex.lock(); }
        void unlock() { _lockable_mutex.unlock(); }
        void unlock() const { _lockable_mutex.unlock(); }
        void scoped_lock(std::function<void()> func) { lock(); func(); unlock(); }
        Mutex& mutex() const { return _lockable_mutex; }
    private:
        mutable Mutex _lockable_mutex;
    };


    /**
     * Simple atomic counter that increments an atomic
     * when entering a scope and decrements it upon exiting the scope
     */
    struct ScopedAtomicCounter
    {
        std::atomic_int& _a;
        ScopedAtomicCounter(std::atomic_int& a) : _a(a) { ++_a; }
        ~ScopedAtomicCounter() { --_a; }
    };

    //! Sets the name of the curent thread
    extern OSGEARTH_EXPORT void setThreadName(const std::string& name);

    //! Sets the thread name with details when scoped
    struct ScopedThreadName
    {
        std::string _base;
        ScopedThreadName(const std::string& base, const std::string& detail) : 
            _base(base)
        {
            setThreadName(base + "(" + detail + ")");
        }
        ~ScopedThreadName()
        {
            setThreadName(_base);
        }
    };

    /**
     * Sempahore lets N users aquire it and then notifies when the
     * count goes back down to zero.
     */
    class Semaphore
    {
    public:
        //! Construct a semaphore
        Semaphore();

        //! Construct a named semaphore
        Semaphore(const std::string& name);

        //! Acquire, increasing the usage count by one
        void acquire();

        //! Release, decreasing the usage count by one.
        //! When the count reaches zero, joiners will be notified and
        //! the semaphore will reset to its initial state.
        void release();

        //! Reset to initialize state; this will cause a join to occur
        //! even if no acquisitions have taken place.
        void reset();

        //! Current count in the semaphore
        std::size_t count() const;

        //! Block until the semaphore count returns to zero.
        //! (It must first have left zero)
        //! Warning: this method will block forever if the count
        //! never reaches zero!
        void join();

        //! Block until the semaphore count returns to zero, or
        //! the operation is canceled.
        //! (It must first have left zero)
        void join(Cancelable* cancelable);

    private:
        int _count;
        std::condition_variable_any _cv;
        mutable Mutex _m;
    };

    class JobArena;

    /**
     * A job group. Dispatch jobs along with a group, and you 
     * can then wait on the entire group to finish.
     */
    class OSGEARTH_EXPORT JobGroup
    {
    public:
        //! Construct a new job group
        JobGroup();

        //! Construct a new named job group
        JobGroup(const std::string& name);

        //! Block until all jobs dispatched under this group are complete.
        void join();

        //! Block until all jobs dispatched under this group are complete,
        //! or the operation is canceled.
        void join(Cancelable*);

    private:
        std::shared_ptr<Semaphore> _sema;
        friend class JobArena;
    };

    /**
     * API for scheduling a task to run in the background.
     *
     * Example usage:
     *
     *   int a = 10, b = 20;
     *
     *   Job job;
     *   
     *   Future<int> result = job.dispatch<int>(
     *      [a, b](Cancelable* progress) {
     *          return (a + b);
     *      }
     *   );
     *
     *   // later...
     *
     *   if (result.isAvailable()) {
     *       std::cout << "Answer = " << result.get() << std::endl;
     *   }
     *   else if (result.isAbandoned()) {
     *       // task was canceled
     *   }
     *
     * @notes Once you call dispatch, you can discard the job,
     *        or you can keep it around and dispatch it again later.
     *        Any changes you make to a Job after dispatch will
     *        not affect the already-dispatched Job.
     */
    class Job
    {
    public:

        //! Construct a new blank job
        Job() :
            _arena(nullptr), _group(nullptr), _priority(0.0f) { }

        //! Construct a new job in the specified arena
        Job(JobArena* arena) :
            _arena(arena), _group(nullptr), _priority(0.0f) { }

        //! Construct a new job in the specified arena and job group
        Job(JobArena* arena, JobGroup* group) :
            _arena(arena), _group(group), _priority(0.0f) { }

        //! Name of this job
        void setName(const std::string& value) {
            _name = value;
        }
        const std::string& getName() const {
            return _name;
        }

        //! Set the job arena in which to run this Job.
        void setArena(JobArena* arena) {
            _arena = arena;
        }
        inline void setArena(const std::string& arenaName);

        //! Static priority
        void setPriority(float value) {
            _priority = value;
        }

        //! Function to call to get this job's priority
        void setPriorityFunction(const std::function<float()>& func) {
            _priorityFunc = func;
        }

        //! Get the priority of this job
        float getPriority() const {
            return _priorityFunc != nullptr ? _priorityFunc() : _priority;
        }

        //! Assign this job to a group
        void setGroup(JobGroup* group) {
            _group = group;
        }
        JobGroup* getGroup() const {
            return _group;
        }

        //! Dispatch this job for asynchronous execution.
        //! @func Function to execute
        //! @return Future result. If this objects goes out of scope,
        //!         the job will be canceled and may not run at all.
        template<typename T>
        Future<T> dispatch(
            std::function<T(Cancelable*)> func) const;

        //! Dispatch the job for asynchronous execution and forget
        //! about it. No return value.
        inline void dispatch(
            std::function<void(Cancelable*)> func) const;

    private:
        std::string _name;
        float _priority;
        JobArena* _arena;
        JobGroup* _group;
        std::function<float()> _priorityFunc;
        friend class JobArena;
    };


    /**
     * Schedules asynchronous tasks on a thread pool.
     * You usually don't need to use this class directly.
     * Use Job::schedule() to queue a new job.
     */
    class OSGEARTH_EXPORT JobArena
    {
    public:
        //! Type of Job Arena (thread pool versus traversal)
        enum Type
        {
            THREAD_POOL,
            UPDATE_TRAVERSAL
        };

        //! Construct a new JobArena
        JobArena(
            const std::string& name,
            unsigned concurrency = 2u,
            const Type& type = THREAD_POOL);

        //! Destroy
        ~JobArena();

        //! Set the concurrency of this job arena
        //! (Only applies to THREAD_POOL type arenas)
        void setConcurrency(unsigned value);

    public: // statics

        //! Access a named arena
        static JobArena* get(const std::string& name);

        //! Access the first arena of type "type". Typically use this to
        //! access an UPDATE_TRAVERSAL arena singleton.
        static JobArena* get(const Type& type);

        //! Sets the concurrency of a named arena
        static void setConcurrency(const std::string& name, unsigned value);

        //! Name of the arena to use when none is specified
        static const std::string& defaultArenaName();

        //! Run one or more pending jobs.
        //! Internal function - do not call this directly.
        void runJobs();

        /**
         * Reflects the current state of the JobArena system
         * This structure is designed to be accessed atomically
         * with no lock contention
         */
        class OSGEARTH_EXPORT Metrics
        {
        public:
            //! Per-arena metrics. ALWAYS check the "active" flag
            struct Arena {
                bool active;
                std::string arenaName;
                std::atomic<int> concurrency;
                std::atomic<int> numJobsPending;
                std::atomic<int> numJobsRunning;
                std::atomic<int> numJobsCanceled;

                Arena() : active(false), concurrency(0), numJobsPending(0), numJobsRunning(0), numJobsCanceled(0) { }
                void free() {
                    active = false, numJobsPending = 0, numJobsRunning = 0,
                        numJobsCanceled = 0;
                }
            };

            //! Report sent to the user reporting function if set
            struct Report {
                Report(const Job& job_, const std::string& arena_, const std::chrono::steady_clock::duration& duration_)
                    : job(job_), arena(arena_), duration(duration_) { }
                const Job& job;
                std::string arena;
                std::chrono::steady_clock::duration duration;
            };

            std::atomic<int> maxArenaIndex;

            //! metrics about the arena at index "index"
            const Arena& arena(int index) const;

            //! Total number of pending jobs across all arenas
            int totalJobsPending() const;

            //! Total number of running jobs across all arenas
            int totalJobsRunning() const;

            //! Total number of running jobs across all arenas
            int totalJobsCanceled() const;

            //! Total number of active jobs in the system
            int totalJobs() const {
                return totalJobsPending() + totalJobsRunning();
            }

            //! Set a user reporting function and threshold
            void setReportFunction(
                std::function<void(const Report&)> function,
                std::chrono::steady_clock::duration minDuration = std::chrono::steady_clock::duration(0))
            {
                _report = function;
                _reportMinDuration = minDuration;
            }

        private:
            Metrics();
            std::vector<Arena> _arenas;
            std::function<void(const Report&)> _report;
            std::chrono::steady_clock::duration _reportMinDuration;
            friend class JobArena;
        };

        //! Access to the system-wide job arena metrics
        static const Metrics& metrics() { return _allMetrics; }

    private:

        void startThreads();

        void stopThreads();

        static void shutdownAll();

        using Delegate = std::function<bool()>;

        //! Schedule an asynchronous task on this arena.
        //! Use Job::dispatch to run jobs (no need to call this directly)
        //! @param job Job details
        //! @param delegate Function to execute
        void dispatch(
            const Job& job,
            Delegate& delegate);

        struct QueuedJob {
            QueuedJob() { }
            QueuedJob(const Job& job, const Delegate& delegate, std::shared_ptr<Semaphore> sema) :
                _job(job), _delegate(delegate), _groupsema(sema) { }
            Job _job;
            Delegate _delegate;
            std::shared_ptr<Semaphore> _groupsema;
            bool operator < (const QueuedJob& rhs) const { 
                return _job.getPriority() < rhs._job.getPriority();
            }
        };

        // pool name
        std::string _name;
        // type of arena
        Type _type;
        // queued operations to run asynchronously
        using Queue = std::vector<QueuedJob>;
        Queue _queue;
        // protect access to the queue
        mutable Mutex _queueMutex;
        mutable Mutex _quitMutex;
        // target number of concurrent threads in the pool
        std::atomic<unsigned> _targetConcurrency;
        // thread waiter block
        std::condition_variable_any _block;
        // set to true when threads should exit
        bool _done;
        // threads in the pool
        std::vector<std::thread> _threads;
        // pointer to the stats structure for this arena
        Metrics::Arena* _metrics;

        static Mutex _arenas_mutex;
        static std::unordered_map<std::string, unsigned> _arenaSizes;
        static std::unordered_map<std::string, std::shared_ptr<JobArena>> _arenas;
        static std::string _defaultArenaName;
        static Metrics _allMetrics;

        friend class Job; // allow access to private dispatch method
    };

    // Job inlines
    void Job::setArena(const std::string& arenaName)
    {
        setArena(JobArena::get(arenaName));
    }

    template<typename T>
    Future<T> Job::dispatch(
        std::function<T(Cancelable*)> function) const
    {
        Promise<T> promise;
        Future<T> future = promise.getFuture();
        JobArena::Delegate delegate = [function, promise]() mutable
        {
            bool good = !promise.isAbandoned();
            if (good)
                promise.resolve(function(&promise));
            return good;
        };
        JobArena* arena = _arena ? _arena : JobArena::get("");
        arena->dispatch(*this, delegate);
        return std::move(future);
    }

    void Job::dispatch(
        std::function<void(Cancelable*)> function) const
    {
        JobArena* arena = _arena ? _arena : JobArena::get("");
        JobArena::Delegate delegate = [function]() {
            function(nullptr);
            return true;
        };
        arena->dispatch(*this, delegate);
    }

} } // namepsace osgEarth::Threading

#define OE_THREAD_NAME(name) osgEarth::Threading::setThreadName(name);

#define OE_SCOPED_THREAD_NAME(base,name) osgEarth::Threading::ScopedThreadName _scoped_threadName(base,name);

#endif // OSGEARTH_THREADING_UTILS_H
